package org.sn.jdish.spark.operator;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;

import scala.Tuple2;

/**
 * 对<key, value>结构的RDD进行升序或降序排列
 * 
 * @author snzigod@hotmail.com
 *
 */
public class SortByKey {

	public static void main(String[] args) {
		/**
		 * SparkConf:第一步创建一个SparkConf，在这个对象里面可以设置允许模式Local Standalone yarn
		 * AppName(可以在Web UI中看到) 还可以设置Spark运行时的资源要求
		 */
		SparkConf conf = new SparkConf().setAppName("sparkOperator").setMaster("local");

		/**
		 * 基于SparkConf的对象可以创建出来一个SparkContext Spark上下文
		 * SparkContext是通往集群的唯一通道.SparkContext在创建的时候还会创建任务调度器.
		 */
		JavaSparkContext sc = new JavaSparkContext(conf);

		sortByKey(sc);

		sc.close();
	}

	static void sortByKey(JavaSparkContext sc) {
		List<Integer> datas = Arrays.asList(60, 70, 80, 55, 45, 75);

		/**
		 * sortBy对RDD进行升序或降序排列.comp：排序时的比较运算方式。ascending：false降序；true升序。
		 */
		JavaRDD<Integer> sortByRDD = sc.parallelize(datas).sortBy(new Function<Integer, Integer>() {
			private static final long serialVersionUID = 1L;

			@Override
			public Integer call(Integer t) throws Exception {
				return t;
			}
		}, true, 1);

		sortByRDD.foreach(new VoidFunction<Integer>() {
			private static final long serialVersionUID = 1L;

			@Override
			public void call(Integer t) throws Exception {
				System.out.println(t);
			}
		});

		List<Tuple2<Integer, Integer>> datas2 = new ArrayList<>();
		datas2.add(new Tuple2<>(3, 3));
		datas2.add(new Tuple2<>(2, 2));
		datas2.add(new Tuple2<>(1, 4));
		datas2.add(new Tuple2<>(2, 3));

		/**
		 * sortByKey对RDD进行升序或降序排列。ascending：false降序；true升序。
		 */
		sc.parallelizePairs(datas2).sortByKey(false).foreach(new VoidFunction<Tuple2<Integer, Integer>>() {
			private static final long serialVersionUID = 1L;

			@Override
			public void call(Tuple2<Integer, Integer> v) throws Exception {
				System.out.println(v._1 + "==" + v._2);
			}
		});
	}

}
